1   /*
2    * Copyright (C) 2008 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import com.google.common.base.Preconditions;
20  
21  import java.util.ArrayDeque;
22  import java.util.Queue;
23  import java.util.concurrent.Executor;
24  import java.util.logging.Level;
25  import java.util.logging.Logger;
26  
27  import javax.annotation.concurrent.GuardedBy;
28  
29  /**
30   * Executor ensuring that all Runnables submitted are executed in order,
31   * using the provided Executor, and serially such that no two will ever
32   * be running at the same time.
33   *
34   * TODO(user): The tasks are given to the underlying executor as a single
35   * task, which means the semantics of the executor may be changed, e.g. the
36   * executor may have an afterExecute method that runs after every task
37   *
38   * TODO(user): What happens in case of shutdown or shutdownNow?  Should
39   * TaskRunner check for interruption?
40   *
41   * TODO(user): It would be nice to provide a handle to individual task
42   * results using Future.  Maybe SerializingExecutorService?
43   *
44   * @author JJ Furman
45   */
46  final class SerializingExecutor implements Executor {
47    private static final Logger log =
48        Logger.getLogger(SerializingExecutor.class.getName());
49  
50    /** Underlying executor that all submitted Runnable objects are run on. */
51    private final Executor executor;
52  
53    /** A list of Runnables to be run in order. */
54    @GuardedBy("internalLock")
55    private final Queue<Runnable> waitQueue = new ArrayDeque<Runnable>();
56  
57    /**
58     * We explicitly keep track of if the TaskRunner is currently scheduled to
59     * run.  If it isn't, we start it.  We can't just use
60     * waitQueue.isEmpty() as a proxy because we need to ensure that only one
61     * Runnable submitted is running at a time so even if waitQueue is empty
62     * the isThreadScheduled isn't set to false until after the Runnable is
63     * finished.
64     */
65    @GuardedBy("internalLock")
66    private boolean isThreadScheduled = false;
67  
68    /** The object that actually runs the Runnables submitted, reused. */
69    private final TaskRunner taskRunner = new TaskRunner();
70  
71    /**
72     * Creates a SerializingExecutor, running tasks using {@code executor}.
73     *
74     * @param executor Executor in which tasks should be run. Must not be null.
75     */
76    public SerializingExecutor(Executor executor) {
77      Preconditions.checkNotNull(executor, "'executor' must not be null.");
78      this.executor = executor;
79    }
80  
81    private final Object internalLock = new Object() {
82      @Override public String toString() {
83        return "SerializingExecutor lock: " + super.toString();
84      }
85    };
86  
87    /**
88     * Runs the given runnable strictly after all Runnables that were submitted
89     * before it, and using the {@code executor} passed to the constructor.     .
90     */
91    @Override
92    public void execute(Runnable r) {
93      Preconditions.checkNotNull(r, "'r' must not be null.");
94      boolean scheduleTaskRunner = false;
95      synchronized (internalLock) {
96        waitQueue.add(r);
97  
98        if (!isThreadScheduled) {
99          isThreadScheduled = true;
100         scheduleTaskRunner = true;
101       }
102     }
103     if (scheduleTaskRunner) {
104       boolean threw = true;
105       try {
106         executor.execute(taskRunner);
107         threw = false;
108       } finally {
109         if (threw) {
110           synchronized (internalLock) {
111             // It is possible that at this point that there are still tasks in
112             // the queue, it would be nice to keep trying but the error may not
113             // be recoverable.  So we update our state and propogate so that if
114             // our caller deems it recoverable we won't be stuck.
115             isThreadScheduled = false;
116           }
117         }
118       }
119     }
120   }
121 
122   /**
123    * Task that actually runs the Runnables.  It takes the Runnables off of the
124    * queue one by one and runs them.  After it is done with all Runnables and
125    * there are no more to run, puts the SerializingExecutor in the state where
126    * isThreadScheduled = false and returns.  This allows the current worker
127    * thread to return to the original pool.
128    */
129   private class TaskRunner implements Runnable {
130     @Override
131     public void run() {
132       boolean stillRunning = true;
133       try {
134         while (true) {
135           Preconditions.checkState(isThreadScheduled);
136           Runnable nextToRun;
137           synchronized (internalLock) {
138             nextToRun = waitQueue.poll();
139             if (nextToRun == null) {
140               isThreadScheduled = false;
141               stillRunning = false;
142               break;
143             }
144           }
145 
146           // Always run while not holding the lock, to avoid deadlocks.
147           try {
148             nextToRun.run();
149           } catch (RuntimeException e) {
150             // Log it and keep going.
151             log.log(Level.SEVERE, "Exception while executing runnable "
152                 + nextToRun, e);
153           }
154         }
155       } finally {
156         if (stillRunning) {
157           // An Error is bubbling up, we should mark ourselves as no longer
158           // running, that way if anyone tries to keep using us we won't be
159           // corrupted.
160           synchronized (internalLock) {
161             isThreadScheduled = false;
162           }
163         }
164       }
165     }
166   }
167 }